package com.linkedin.parseq.internal;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.log4j.Logger;
import org.slf4j.event.Level;
import org.testng.annotations.Test;

import com.linkedin.parseq.BaseEngineTest;
import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.Task;


public class TestExecutionMonitor extends BaseEngineTest {

  @Override
  protected void customizeEngine(EngineBuilder engineBuilder) {
    engineBuilder.setEngineProperty(Engine.MONITOR_EXECUTION, true);
  };

  @Test
  public void testStallsWithStuckThread() throws InterruptedException, BrokenBarrierException, TimeoutException {
    final TestLogAppender logAppender = new TestLogAppender();
    Logger.getLogger(ExecutionMonitor.class).addAppender(logAppender);

    try {

      final BlockingQueue<Object> queue = new LinkedBlockingQueue<>(1);
      final BlockingQueue<Long> timerQueue = new LinkedBlockingQueue<>(1);

      final TestClock clock = new TestClock(() -> timerQueue.poll(5, TimeUnit.SECONDS),
          nano -> queue.poll(5, TimeUnit.SECONDS));

      final ExecutionMonitor monitor = new ExecutionMonitor(1024, TimeUnit.SECONDS.toNanos(1),
          TimeUnit.MILLISECONDS.toNanos(10), TimeUnit.MINUTES.toNanos(1), TimeUnit.MINUTES.toNanos(1),
          TimeUnit.MILLISECONDS.toNanos(10), 1024, Level.WARN, clock);

      long nano = 0;
      final long iAtFirstLog = TimeUnit.SECONDS.toNanos(1) / TimeUnit.MILLISECONDS.toNanos(15) + 1;

      timerQueue.put(nano);  //monitor needs initial value
      timerQueue.put(nano);  //activate needs to know time
      monitor.getLocalMonitorState().activate();  //simulate thread that is stuck

      for (int i = 0; i < iAtFirstLog; i++) {
        nano += TimeUnit.MILLISECONDS.toNanos(15); //5ms delay but still not a stall
        queue.put(this); //unblock sleep
        timerQueue.put(nano);  //first nano check
        timerQueue.put(nano);  //nano check at the end
      }

      assertEquals(monitor.getStallsSince(0), 0);
      assertEquals(logAppender.getNumberOfLogEvents(), 0);

      timerQueue.put(nano);  //additional read consumed by logger
      timerQueue.put(nano);  //additional read consumed by logger

      //one additional step to make sure log event got generated by a monitoring thread
      nano += TimeUnit.MILLISECONDS.toNanos(15); //5ms delay but still not a stall
      queue.put(this); //unblock sleep
      timerQueue.put(nano);  //first nano check
      timerQueue.put(nano);  //nano check at the end

      assertEquals(logAppender.getNumberOfLogEvents(), 1);

      for (int i = 0; i < 1000 - iAtFirstLog - 1; i++) {
        nano += TimeUnit.MILLISECONDS.toNanos(15); //5ms delay but still not a stall
        queue.put(this); //unblock sleep
        timerQueue.put(nano);  //first nano check
        timerQueue.put(nano);  //nano check at the end
      }

      assertEquals(monitor.getStallsSince(0), 0);
      assertEquals(logAppender.getNumberOfLogEvents(), 1, "First log event should be generated after " +
          iAtFirstLog + " iterations");

      long nanosUntilNextLog = TimeUnit.MINUTES.toNanos(1) + (iAtFirstLog * TimeUnit.MILLISECONDS.toNanos(15)) - nano;

      final long iAtSecondLog = nanosUntilNextLog / TimeUnit.MILLISECONDS.toNanos(30) + 1;

      for (int i = 0; i < iAtSecondLog; i++) {
        nano += TimeUnit.MILLISECONDS.toNanos(30); //15ms delay over smallest observed delay (15ms)
        queue.put(this); //unblock sleep
        timerQueue.put(nano);  //first nano check
        timerQueue.put(nano);  //nano check at the end
      }

      assertEquals(logAppender.getNumberOfLogEvents(), 1);

      timerQueue.put(nano);  //additional read consumed by logger
      timerQueue.put(nano);  //additional read consumed by logger

      //one additional step to make sure log event got generated by a monitoring thread
      nano += TimeUnit.MILLISECONDS.toNanos(30); //5ms delay but still not a stall
      queue.put(this); //unblock sleep
      timerQueue.put(nano);  //first nano check
      timerQueue.put(nano);  //nano check at the end

      assertEquals(logAppender.getNumberOfLogEvents(), 2);
      assertEquals(monitor.getStallsSince(0), Math.min(1024,  iAtSecondLog) * 15 * 1000000);

      timerQueue.put(nano);  //activate needs to know time
      monitor.getLocalMonitorState().deactivate();  //unstuck thread

      //make sure stalls list gets trimmed
      for (int i = 0; i < TimeUnit.MINUTES.toNanos(1) / TimeUnit.MILLISECONDS.toNanos(30); i++) {
        nano += TimeUnit.MILLISECONDS.toNanos(30); //5ms delay but still not a stall
        queue.put(this); //unblock sleep
        timerQueue.put(nano);  //first nano check
        timerQueue.put(nano);  //nano check at the end
      }

      assertEquals(monitor.getStallsSince(0), 0);

    } finally {
      Logger.getLogger(ExecutionMonitor.class).removeAppender(logAppender);
    }
  }

  @Test
  public void testStallsWithoutStuckThread() throws InterruptedException, BrokenBarrierException, TimeoutException {
    final TestLogAppender logAppender = new TestLogAppender();
    Logger.getLogger(ExecutionMonitor.class).addAppender(logAppender);

    try {

      final BlockingQueue<Object> queue = new LinkedBlockingQueue<>(1);
      final BlockingQueue<Long> timerQueue = new LinkedBlockingQueue<>(1);

      final TestClock clock = new TestClock(() -> timerQueue.poll(5, TimeUnit.SECONDS),
          nano -> queue.poll(5, TimeUnit.SECONDS));

      final ExecutionMonitor monitor = new ExecutionMonitor(1024, TimeUnit.SECONDS.toNanos(1),
          TimeUnit.MILLISECONDS.toNanos(10), TimeUnit.MINUTES.toNanos(1), TimeUnit.MINUTES.toNanos(1),
          TimeUnit.MILLISECONDS.toNanos(10), 1024, Level.WARN, clock);

      long nano = 0;

      timerQueue.put(nano);  //activate needs to know time
      timerQueue.put(nano);  //monitor needs initial value
      monitor.getLocalMonitorState().activate();  //simulate thread that is stuck

      for (int i = 0; i < 1000; i++) {
        nano += TimeUnit.MILLISECONDS.toNanos(15); //5ms delay but still not a stall
        queue.put(this); //unblock sleep
        timerQueue.put(nano);  //first nano check
        timerQueue.put(nano);  //nano check at the end
        if (i % 10 == 9) {
          timerQueue.put(nano);  //nano check at the end
          monitor.getLocalMonitorState().activate();  //simulate thread that is stuck
        }
      }

      assertEquals(monitor.getStallsSince(0), 0);
      assertEquals(logAppender.getNumberOfLogEvents(), 0);

      for (int i = 0; i < 1000; i++) {
        nano += TimeUnit.MILLISECONDS.toNanos(30); //15ms delay over smallest observed delay (15ms)
        queue.put(this); //unblock sleep
        timerQueue.put(nano);  //first nano check
        timerQueue.put(nano);  //nano check at the end
        if (i % 10 == 9) {
          timerQueue.put(nano);  //nano check at the end
          monitor.getLocalMonitorState().activate();  //simulate thread that is stuck
        }
      }

      assertEquals(monitor.getStallsSince(0), 10L * 15 * 1000000, "JVM stalls are trimed according to list of "
          + "active threads. Only stalls that can possibly affect active threads are kept. In this test thread is "
          + "being activated every 100ms. Since monitoring interval is 10ms it means that 10 stalls are expected "
          + "to be found and thus total stalls hsould be 10 * 15ms.");

      assertEquals(logAppender.getNumberOfLogEvents(), 0);

    } finally {
      Logger.getLogger(ExecutionMonitor.class).removeAppender(logAppender);
    }
  }


  @Test
  public void testThreadDump() {
    final TestLogAppender logAppender = new TestLogAppender();
    Logger.getLogger(ExecutionMonitor.class).addAppender(logAppender);

    try {

      for (int i = 0; i < 100; i++) {
        run(Task.value(null));
      }

      runAndWait(Task.action(() -> Thread.sleep(2000)));

      assertEquals(logAppender.getNumberOfLogEvents(), 1);
      assertTrue(logAppender.logEventAtIndexMatchesCriteria(0, "com.linkedin.parseq.internal.ExecutionMonitor", org.apache.log4j.Level.WARN,
          "Found ParSeq threads running longer than", null));
      assertTrue(logAppender.logEventAtIndexMatchesCriteria(0, "com.linkedin.parseq.internal.ExecutionMonitor", org.apache.log4j.Level.WARN,
          "Thread.sleep", null));

    } finally {
      Logger.getLogger(ExecutionMonitor.class).removeAppender(logAppender);
    }
  }

}
